{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ElasticSearch"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this example notebook we show how to write/read data to/from Elasticsearch using spark. We use the dataset from [American Kennel Club dog breed data](https://data.world/len/dog-canine-breed-size-akc/workspace/file?filename=AKC+Breed+Info.csv)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "1. Dowload the dataset\n",
    "```\n",
    "wget -O akc_breed_info.csv https://query.data.world/s/msmjhcmdjslsvjzcaqmtreu52gkuno\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2. Upload akc_breed_info.csv to your resources dataset "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Writing to Elasticsearch"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "import io.hops.util.Hops\n",
      "df: org.apache.spark.sql.DataFrame = [Breed: string, height_low_inches: string ... 3 more fields]\n"
     ]
    }
   ],
   "source": [
    "import io.hops.util.Hops\n",
    "\n",
    "val df = spark.read.option(\"header\",\"true\").csv(\"hdfs:///Projects/\"+ Hops.getProjectName() +\"/Resources/akc_breed_info.csv\")\n",
    "\n",
    "(df.write\n",
    "  .format(\"org.elasticsearch.spark.sql\")\n",
    "  .options(Hops.getElasticConfiguration(\"newindex\"))\n",
    "  .mode(\"Overwrite\")\n",
    "  .save())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Reading from Elasticsearch"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "reader: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@416ac438\n",
      "df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Breed: string, height_high_inches: string ... 3 more fields]\n",
      "+--------------------+------------------+-----------------+---------------+--------------+\n",
      "|               Breed|height_high_inches|height_low_inches|weight_high_lbs|weight_low_lbs|\n",
      "+--------------------+------------------+-----------------+---------------+--------------+\n",
      "|       Affenpinscher|                12|                9|             12|             8|\n",
      "|        Afghan Hound|                27|               25|             60|            50|\n",
      "|     Airdale Terrier|                24|               22|             45|            45|\n",
      "|               Akita|                28|               26|            120|            80|\n",
      "|    Alaskan Malamute|                na|               na|             na|            na|\n",
      "|     American Eskimo|                19|                9|             30|            25|\n",
      "|   American Foxhound|                25|               22|             70|            65|\n",
      "|American Stafford...|                19|               17|             50|            40|\n",
      "|American Water Sp...|                18|               15|             45|            25|\n",
      "|  Anatolian Sheepdog|                29|               27|            150|           100|\n",
      "|Australian Cattle...|                20|               17|             45|            35|\n",
      "| Australian Shepherd|                23|               18|             60|            40|\n",
      "|  Australian Terrier|                10|               10|             14|            10|\n",
      "|             Basenji|                17|               17|             22|            20|\n",
      "|        Basset Hound|                14|               14|             50|            40|\n",
      "|              Beagle|                16|               13|             30|            18|\n",
      "|      Bearded Collie|                22|               20|             60|            40|\n",
      "|           Beauceron|                27|               24|            120|           100|\n",
      "|  Bedlington Terrier|                16|               15|             23|            18|\n",
      "|    Belgian Malinois|                26|               22|             65|            60|\n",
      "+--------------------+------------------+-----------------+---------------+--------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val reader = spark.read.format(\"org.elasticsearch.spark.sql\").options(Hops.getElasticConfiguration(\"newindex\"))\n",
    "val df = reader.load().na.drop.orderBy($\"breed\")\n",
    "df.show()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Spark",
   "language": "",
   "name": "sparkkernel"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}